package org.budo.warehouse.logic.consumer;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.Resource;

import org.budo.support.dao.page.Page;
import org.budo.support.lang.util.ListUtil;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.IEventFilterLogic;
import org.budo.warehouse.logic.bean.LogicDynamicBeanProvider;
import org.budo.warehouse.logic.producer.DataMessageImpl;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IFieldMappingService;
import org.budo.warehouse.service.api.IPipelineService;
import org.budo.warehouse.service.entity.FieldMapping;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.stereotype.Component;

/**
 * @author limingwei
 */
@Component("dispatcherDataConsumer")
public class DispatcherDataConsumer implements DataConsumer {
    @Resource
    private IPipelineService pipelineService;

    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private LogicDynamicBeanProvider logicDynamicBeanProvider;

    @Resource
    private IEventFilterLogic eventFilterLogic;

    @Resource
    private IFieldMappingService fieldMappingService;

    @Override
    public void consume(DataMessage dataMessage) {
        Integer dataNodeId = dataMessage.getDataNodeId();
        List<Pipeline> pipelines = pipelineService.listBySourceDataNodeCached(dataNodeId, Page.max());

        for (Pipeline pipeline : pipelines) {
            DataMessage filteredDataMessage = eventFilterLogic.filter(pipeline, dataMessage);
            if (null == filteredDataMessage || ListUtil.isNullOrEmpty(filteredDataMessage.getDataEntries())) {
                continue; // 事件过滤
            }

            DataMessage fieldMappingDataMessage = this.fieldMappingDataMessage(filteredDataMessage, pipeline);
            this.pipelineHandleEntry(fieldMappingDataMessage, pipeline);
        }
    }

    private DataMessage fieldMappingDataMessage(DataMessage dataMessage, Pipeline pipeline) {
        Boolean originalFields = fieldMappingService.findOriginalFieldsValueByPipelineIdCached(pipeline.getId());
        List<FieldMapping> fieldMappings = fieldMappingService.listByPipelineIdCached(pipeline.getId()); // 字段对应关系
        if (null == originalFields) {
            originalFields = null == fieldMappings || fieldMappings.isEmpty(); // 如果未配置originalFields就以对应关系是否为空判断
        }

        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> fieldMappingDataEntries = new ArrayList<DataEntry>(dataEntries.size());
        for (DataEntry dataEntry : dataEntries) {
            FieldMappingDataEntry fieldMappingDataEntry = new FieldMappingDataEntry(dataEntry, pipeline, originalFields, fieldMappings);
            fieldMappingDataEntries.add(fieldMappingDataEntry);
        }

        return new DataMessageImpl(dataMessage.getDataNodeId(), fieldMappingDataEntries);
    }

    private void pipelineHandleEntry(DataMessage dataMessage, Pipeline pipeline) {
        DataConsumer dataConsumer = logicDynamicBeanProvider.dataConsumer(pipeline);
        dataConsumer.consume(dataMessage);
    }
}